Как вы можете видеть в журналах ошибок, эта ошибка из вашего кластера Kafka.Эта проблема возникает, когда память прямого буфера брокера Kafka превышает размер кучи , назначенный JVM.Прямая буферная память выделяется из кучи JVM, как того требует приложение.Когда вы используете параллелизм> 1, несколько задач Flink, min (Количество слотов Flink, Количество разделов Kafka) будут одновременно использовать данные из Kafka, что приведет к большему использованию размера кучи брокеров Kafka по сравнениюкогда параллелизм равен единице и произойдет так называемая ошибка.Стандартным решением является увеличение размера кучи, доступного для Kafka Brokers, путем добавления переменной KAFKA_HEAP_OPTS в файл env Kafka или в качестве переменной среды ОС .Например, добавьте следующую строку, чтобы установить размер кучи равным 2 ГБ:
export KAFKA_HEAP_OPTS="-Xms2G -Xmx2G"
Но в вашем случае, когда нет доступа к брокеру Kafka (в зависимости от вашего вопроса), вы можете уменьшить количествозапись возвращается в одном вызове poll (), поэтому потребность в памяти кучи у брокеров будет уменьшена.(Это не стандартное решение, я рекомендую просто исчезнуть ошибка).
Из этого ответа :
Потребители Kafka обрабатывают данные о невыполненных работах по следующим двум параметрам:
max.poll.interval.ms
Максимальная задержка между вызовами опроса () при использовании управления группами потребителей.Это накладывает верхнюю границу на количество времени, в течение которого потребитель может бездействовать до получения большего количества записей.Если poll () не вызывается до истечения этого тайм-аута, то считается, что потребитель потерпел неудачу, и группа будет перебалансирована, чтобы переназначить разделы другому участнику.Значение по умолчанию - 300000.
max.poll.records
Максимальное количество записей, возвращаемых за один вызов poll ().Значение по умолчанию - 500.
Игнорирование установки вышеупомянутых двух параметров в соответствии с требованием может привести к опросу максимальных данных, которые потребитель не сможет обработать с помощью доступных ресурсов, что приведет к OutOfMemory или к отказусовершать смещение потребителя в разы.Следовательно, всегда желательно использовать параметры max.poll.records и max.poll.interval.ms.
Поэтому для теста уменьшите значение max.poll.записывает , например, 250 и проверяет, не произойдет ли пока ошибка.
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BOOTSTRAPSERVERS);
properties.setProperty("group.id", ID);
properties.setProperty("key.deserializer", Serializer);
properties.setProperty("value.deserializer", Deserializer);
properties.setProperty("max.poll.records", "250");
FlinkKafkaConsumer08<String> myConsumer =
new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);